今天目標會是先處理, TicketModule 中關於票存統計資訊同步的問題。
票存資料主要是紀錄某一個活動( Event )與開出去的票( Ticket ) 之間的關係。也就是一個活動中,開出去多少票,還有多少開出去得票被使用。屬於統計資訊,也就是可以透過所有開出去的票統計出來。
這邊使用快取的方式,紀錄某個時間點累積出來的結果。當開出新的票或是檢入新的參加票。這時,這個統計資料就會被更新。
實踐快取的方法是使用 redis ,並且透過 lua script 的方式來限制一次只有一個執行序能修改統計值。避免同時修改同一個值,造成狀態不一致。
FROM redis
CMD ["sh", "-c", "exec redis-server --requirepass \"$REDIS_PASSWORD\""]
redis:
container_name: ticket-redis
build:
context: .
dockerfile: ./redis_container/Dockerfile
image: ticket-redis
restart: always
ports:
- 6379:6379
environment:
- REDIS_PASSWORD=${REDIS_PASSWORD}
healthcheck:
test: [ "CMD", "redis-cli", "-a", "${REDIS_PASSWORD}", "--raw", "incr", "ping" ]
interval: 10s
timeout: 5s
retries: 5
logging:
driver: json-file
options:
max-size: 1k
max-file: 3
docker compose up redis -d
pnpm i -S ioredis
local total_key = KEYS[1]..":total"
local join_key = KEYS[1]..":join"
local request_total = tonumber(ARGV[1])
local request_join = tonumber(ARGV[2])
local default_total = tonumber(ARGV[3])
local default_join = tonumber(ARGV[4])
-- initial default value
local total = redis.call("GET", total_key)
if not total then
total = default_total
end
total = tonumber(total)
local join = redis.call("GET", join_key)
if not join then
join = default_join
end
join = tonumber(join)
-- parse request
request_total = tonumber(request_total)
request_join = tonumber(request_join)
-- validate request
local is_valid = 1
if request_total < 0 or request_join < 0 then
is_valid = 0
return {total, join, is_valid, "request invalid"}
end
if request_join > 0 and total < request_join then
is_valid = 0
return {total, join, is_valid, "total not sufficient"}
end
if request_total > 0 and request_join > 0 then
is_valid = 0
return {total, join, is_valid, "request invalid"}
end
-- increase total
if request_total > 0 and request_join == 0 then
total = total + request_total
end
-- increase join, decrease total
if request_join > 0 and total >= request_join then
total = total - request_join
join = join + request_join
end
redis.call("SET", total_key, total)
redis.call("SET", join_key, join)
return {total, join, is_valid, ""}
對於不是 typescript 的部份,在 nestjs 這邊提供了 asset 的選項。需要在 nestjs-cli.json 對 CompilerOptions 作以下設定
"compilerOptions": {
"deleteOutDir": true,
"assets": [
{ "include": "**/*.lua", "outDir": "dist", "watchAssets": true }
],
"watchAssets": true
}
意思是把 src 資料夾下,所有副檔名是 lua 的檔案,都複製打包到 dist 資料夾下。這邊會需要注意,跟載入資料的相對邏輯有關。
import { BadRequestException, Injectable, InternalServerErrorException, Logger, OnModuleDestroy } from '@nestjs/common';
import { ConfigService } from '@nestjs/config';
import { Redis } from 'ioredis';
import * as fs from 'fs/promises';
import * as path from 'path';
import { EventCounterDto } from './dto/event-counter.dto';
import { EventCounterEntity } from './schema/event-counter.entity';
@Injectable()
export class RedisService implements OnModuleDestroy {
private logger: Logger = new Logger(RedisService.name);
private connect_url: string = '';
private script_sha: string = '';
private connection: Redis;
constructor(
private readonly configService: ConfigService,
) {
this.connect_url = this.configService.getOrThrow<string>('REDIS_URL');
this.connection = new Redis(
this.connect_url,
{
reconnectOnError(err:Error) {
const targetError = 'READONLY';
if (err.message.includes(targetError)) {
// Only reconnect when the error contains "READONLY"
return true; // or `return 1;`
}
},
retryStrategy(times) {
const delay = Math.min(times * 50, 2000);
return delay;
}
});
// setup on event binding
this.connection.on('connect', this.handleConnect.bind(this));
this.connection.on('ready', this.handleReady.bind(this));
this.connection.on('close', this.handleClose.bind(this));
this.connection.on('error', this.handleError.bind(this));
this.connection.on('reconnecting', this.handleReconnecting.bind(this));
this.connection.on('end', this.handleEnd.bind(this));
this.loadLuaScript();
}
private handleConnect() {
this.logger.log({
message: 'redis connecting...',
url: this.connect_url,
type: 'REDIS_CONNECTING'
});
}
private handleReady() {
this.logger.log({
message: 'redis connected',
url: this.connect_url,
type: 'REDIS_CONNECTED'
});
}
private handleClose() {
this.logger.warn({
message: 'redis disconnected',
url: this.connect_url,
type: 'REDIS_DISCONNECTED'
});
}
private handleError(err: unknown) {
const error: Error = err as Error;
this.logger.error({
message: 'redis error occured',
url: this.connect_url,
type: 'REDIS_ERROR',
err: error,
})
}
private handleReconnecting() {
this.logger.log({
message: 'redis reconnecting',
url: this.connect_url,
type: 'REDIS_RECONNECTING',
});
}
private handleEnd() {
this.logger.log({
message: 'redis connection ended',
url: this.connect_url,
type: 'REDIS_CONNECTION_ENDED'
});
}
onModuleDestroy() {
this.connection.disconnect(false);
}
private async checkLuaScript(sha: string) {
return new Promise<boolean>((resolve, reject) => {
if (sha === '') {
resolve(false);
}
this.connection.script('EXISTS' ,sha, (err, result)=> {
if (err) {
reject(err);
}
resolve(result[0]===1);
})
});
}
async hasKey(key: string) {
return new Promise<boolean>((resolve, reject) => {
this.connection.get(`${key}:total`, (err, result) => {
if (err) {
reject(err);
return
}
resolve(result != null);
})
});
}
async getCount(key: string):Promise<EventCounterEntity> {
try {
const totalPromise = new Promise<string>((resolve, reject) => {
this.connection.get(`${key}:total`, (err, result) => {
if (err) {
reject(err);
return;
}
resolve(result);
});
});
const joinPromise = new Promise<string>((resolve, reject) => {
this.connection.get(`${key}:join`, (err, result) => {
if (err) {
reject(err);
return;
}
resolve(result);
});
});
const [total, join] = await Promise.all([totalPromise, joinPromise]);
return {
eventId: key,
totalTicketNumber: parseInt(total),
attendeeNumber: parseInt(join),
}
} catch (err: unknown) {
const error: Error = err as Error;
}
}
async executeLuaScript(eventCounterDto: EventCounterDto) {
try {
const isLoad = await this.checkLuaScript(this.script_sha);
if (!isLoad) {
await this.loadLuaScript();
}
const resultPromise = new Promise((resolve, reject) => {
this.connection.evalsha(this.script_sha,
1,
eventCounterDto.eventId,
eventCounterDto.requestTotal,
eventCounterDto.requestJoin,
eventCounterDto.accumTotal,
eventCounterDto.accumJoin,
(err, result) => {
if (err) {
reject(err);
}
resolve(result);
});
});
const result = await resultPromise;
const total = parseInt(result[0]);
const join = parseInt(result[1]);
const isValid = parseInt(result[2]) == 1;
const errMessage = result[3];
if (!isValid) {
throw new BadRequestException({
message: errMessage,
type: 'LUA_SCRIPT_EXEC_FAILED'
});
}
return {
total: total,
join: join,
}
} catch (err: unknown) {
const error: Error = err as Error;
if ( error instanceof BadRequestException || error instanceof InternalServerErrorException) {
throw error;
}
throw new InternalServerErrorException({
message: 'execute lua script failed',
error: error,
type: 'LUA_SCRIPT_EXEC_FAILED'
})
}
}
private async loadLuaScript() {
try {
const luaScript = await fs.readFile(path.join(__dirname, '..','lua', 'event_counter.lua'));
const resultPromise = new Promise<string>((resolve, reject) => {
this.connection.script('LOAD', luaScript , (err, result) => {
if (err) {
reject(err);
}
const sha: string = result as string;
this.script_sha = sha;
this.logger.log({
message: 'lua sript loaded',
script_sha: sha,
type: 'LUA_SCRIPT_LOAD'
});
resolve(sha);
});
});
return await resultPromise;
} catch (err: unknown) {
const error: Error = err as Error;
this.logger.error({message: error.message, error});
throw new InternalServerErrorException({
message: 'luascript load failed',
error: error,
type: 'LUA_LOAD_FAILED'
});
}
}
}
import { Injectable, InternalServerErrorException, NotFoundException } from '@nestjs/common';
import { EventCounterRespository } from './event-counter.repository';
import { EventCounterEntity } from './schema/event-counter.entity';
import { RedisService } from './redis.service';
@Injectable()
export class EventCounterRedisStore implements EventCounterRespository {
constructor(
private readonly redisService: RedisService
) {}
async get(eventId: string): Promise<EventCounterEntity> {
try {
const hasKey = await this.redisService.hasKey(eventId);
if (!hasKey) {
throw new NotFoundException({
message: 'event id not initial or not existed', eventId });
}
return this.redisService.getCount(eventId);
} catch (err: unknown) {
const error: Error = err as Error;
if (error instanceof NotFoundException) {
throw error;
}
throw new InternalServerErrorException({
message: 'event counter get error',
error: error,
type: 'COUNTER_GET_ERROR'
})
}
}
async verifyIncr(eventId: string, ticketNumber: number, accumAttendee: number, accumTicket: number): Promise<EventCounterEntity> {
const resultCounter = new EventCounterEntity();
resultCounter.eventId = eventId;
const result = await this.redisService.executeLuaScript({
eventId: eventId,
requestJoin: ticketNumber,
requestTotal:0,
accumTotal: accumTicket,
accumJoin: accumAttendee,
});
resultCounter.attendeeNumber = result.join;
resultCounter.totalTicketNumber = result.total;
return resultCounter;
}
async ticketIncr(eventId: string, ticketNumber: number, accumAttendee: number, accumTicket: number): Promise<EventCounterEntity> {
const resultCounter = new EventCounterEntity();
resultCounter.eventId = eventId;
const result = await this.redisService.executeLuaScript({
eventId: eventId,
requestJoin: 0,
requestTotal: ticketNumber,
accumTotal: accumTicket,
accumJoin: accumAttendee,
});
resultCounter.attendeeNumber = result.join;
resultCounter.totalTicketNumber = result.total;
return resultCounter;
}
}
import { BadRequestException, Inject, Injectable } from '@nestjs/common';
import { IncreaseAttendeeDto, IncreaseTicketDto, InitialCounterDto } from './dto/ticket.dto';
import { EventCounterRespository } from './event-counter.repository';
import { OnEvent } from '@nestjs/event-emitter';
import { CreateCounterEvent, CreateTicketEvent, VerifyTicketEvent } from './dto/ticket.event';
import { TicketsRepository } from './tickets.repository';
import { TicketDbStore } from './ticket-db.store';
import { EventCounterRedisStore } from './event-couter.redis.store';
@Injectable()
export class EventsCounterService {
constructor(
@Inject(TicketDbStore)
private readonly ticketRepo: TicketsRepository,
@Inject(EventCounterRedisStore)
private readonly eventCounterRepo: EventCounterRespository,
) {}
async initCounter(initRequestDto: InitialCounterDto) {
const result = await this.eventCounterRepo.ticketIncr(initRequestDto.eventId,0,0,0);
return result;
}
async increaseTicketCount(increaseTicketRequestDto: IncreaseTicketDto) {
let result;
try {
const currentResult = await this.eventCounterRepo.get(increaseTicketRequestDto.eventId);
result = await this.eventCounterRepo.ticketIncr(increaseTicketRequestDto.eventId, increaseTicketRequestDto.ticketNumber, currentResult.attendeeNumber, currentResult.totalTicketNumber);
} catch(error) {
// fetch data from db
const count = await this.ticketRepo.getCounts({ eventId: increaseTicketRequestDto.eventId });
result = await this.eventCounterRepo.ticketIncr(increaseTicketRequestDto.eventId, increaseTicketRequestDto.ticketNumber, count.accumAttendee, count.accumTickets);
}
return result;
}
async increaseAttendeeCount(increaseAttendeeRequestDto: IncreaseAttendeeDto) {
const currentResult = await this.eventCounterRepo.get(increaseAttendeeRequestDto.eventId);
if (currentResult.totalTicketNumber < currentResult.attendeeNumber + increaseAttendeeRequestDto.ticketNumber) {
throw new BadRequestException(`increase attendee number should not large than total ticket number`);
}
const result = await this.eventCounterRepo.verifyIncr(increaseAttendeeRequestDto.eventId, increaseAttendeeRequestDto.ticketNumber, currentResult.attendeeNumber, currentResult.totalTicketNumber);
return result;
}
@OnEvent('create-counter-event')
async handleCreateCounter(payload: CreateCounterEvent) {
await this.initCounter(payload);
}
@OnEvent('create-ticket-event')
async handleCreateTicket(payload: CreateTicketEvent) {
await this.increaseTicketCount(payload);
}
@OnEvent('verify-ticket-event')
async handleVerifyTicket(payload: VerifyTicketEvent) {
await this.increaseAttendeeCount(payload);
}
}
import { Module } from '@nestjs/common';
import { TicketsService } from './tickets.service';
import { TicketsController } from './tickets.controller';
import { EventsCounterService } from './events-counter.service';
import { TicketDbStore } from './ticket-db.store';
import { TypeOrmModule } from '@nestjs/typeorm';
import { TicketEntity } from './schema/ticket.entity';
import { RedisService } from './redis.service';
import { EventCounterRedisStore } from './event-couter.redis.store';
@Module({
imports: [ TypeOrmModule.forFeature([TicketEntity])],
providers: [TicketsService, EventsCounterService, TicketDbStore,
RedisService, EventCounterRedisStore,
],
controllers: [TicketsController]
})
export class TicketsModule {}
pnpm i -D @testcontainer/redis
const initRedis = async() => {
const redis = await new RedisContainer()
.withPassword('123456')
.withPrivilegedMode()
.start();
const REDIS_URL = redis.getConnectionUrl();
process.env.REDIS_URL = REDIS_URL;
global.redis = redis;
}
const init = async () => {
await initPostgresql();
await initRedis();
}
beforeAll(async () => {
postgresql = global.postgresql;
redis = global.redis;
const moduleFixture: TestingModule = await Test.createTestingModule({
imports: [AppModule],
}).compile();
app = moduleFixture.createNestApplication();
app.useGlobalPipes(new ValidationPipe({
whitelist: true,
transform: true,
}))
await app.init();
});
afterAll(async () => {
await app.close();
await postgresql.stop();
await redis.stop();
})
pnpm test:watch
1.執行 unit test 測試修改後的行為一致
pnpm test:watch
接下來的篇章會,開始講述如何加上其他細部的優化的功能,比如格式化驗證,gracefulshutdown 等等。
有著原本的測試 spec 保護,可以很容易在實做中檢驗新的實做行為是否正確。這就是測試先行的好處,除了可以用來驗證系統設計的好壞。也可以檢驗說當下要需求是有被實做滿足。
雖然說測試能夠檢驗出行為,可是實際去執行驗證時。有時還是會需要經過 logger 來對當下的值作查證。後續會講到,如何使用 winston 套件作格式化 logger。